package com.xiaofan.scala

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object StreamingWithRedisSink_B0005 {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val text: DataStream[String] = env.socketTextStream("192.168.1.27", 9999)

    val l_wordsData: DataStream[(String, String)] = text.map(("l_words", _))

    val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.157.11").setPort(6379).build()
    val redisSink: RedisSink[(String, String)] = new RedisSink[(String, String)](conf, new MyRedisMapper())

    l_wordsData.addSink(redisSink)

    env.execute("StreamingWithRedisSink_B0005")
  }
}

class MyRedisMapper extends RedisMapper[(String,String)] {
  override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.LPUSH)

  override def getKeyFromData(data: (String, String)): String = data._1

  override def getValueFromData(data: (String, String)): String = data._2
}
